package com.gitee.kamismile.stoneComEx.util;

import com.google.gson.internal.LinkedTreeMap;
import com.gitee.kamismile.stone.commmon.base.ResultVO;
import com.gitee.kamismile.stone.commmon.util.JsonUtil;
import com.gitee.kamismile.stone.commmon.util.ValueUtils;
import org.apache.commons.io.IOUtils;
import org.apache.http.Header;
import org.apache.http.HttpEntity;
import org.apache.http.HttpResponse;
import org.apache.http.NameValuePair;
import org.apache.http.client.config.RequestConfig;
import org.apache.http.client.entity.UrlEncodedFormEntity;
import org.apache.http.client.methods.HttpGet;
import org.apache.http.client.methods.HttpPost;
import org.apache.http.client.methods.HttpUriRequest;
import org.apache.http.concurrent.FutureCallback;
import org.apache.http.impl.nio.client.CloseableHttpAsyncClient;
import org.apache.http.impl.nio.client.HttpAsyncClients;
import org.apache.http.impl.nio.conn.PoolingNHttpClientConnectionManager;
import org.apache.http.impl.nio.reactor.DefaultConnectingIOReactor;
import org.apache.http.impl.nio.reactor.IOReactorConfig;
import org.apache.http.message.BasicNameValuePair;
import org.apache.http.nio.reactor.ConnectingIOReactor;
import org.apache.http.nio.reactor.IOReactorException;
import org.springframework.util.StopWatch;

import java.io.InputStream;
import java.io.UnsupportedEncodingException;
import java.net.URLEncoder;
import java.util.*;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.function.Consumer;


/**
 * Created by lidong on 2017/3/3.
 */
public class HttpAsyncClientUtils {
    private HttpAsyncClientUtils() {
    }

    private static volatile CloseableHttpAsyncClient closeableHttpAsyncClient = null;
    private static ReadWriteLock rwl = new ReentrantReadWriteLock();
    private static String userAgent = "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_7_3) AppleWebKit/535.19 (KHTML, like Gecko) Chrome/18.0.1025.151 Safari/535.19";

//ReentrantLock

    static CloseableHttpAsyncClient getAsynClient() {
        if (null == closeableHttpAsyncClient) {
            rwl.writeLock().lock();
            try {
                if (null == closeableHttpAsyncClient) {
                    // 自定义配置
                    RequestConfig requestConfig = RequestConfig.custom()
                            .setSocketTimeout(8000)
                            .setConnectTimeout(8000)
                            .build();
                    ConnectingIOReactor ioReactor = null;
                    try {
                        ioReactor = new DefaultConnectingIOReactor(IOReactorConfig.DEFAULT);
                    } catch (IOReactorException e) {
                    }
                    PoolingNHttpClientConnectionManager manager = new PoolingNHttpClientConnectionManager(ioReactor);
//                    manager.setMaxTotal(880); // 整个连接池最大连接数,并不是越大越好,根据CPU核心数设置
//                    manager.setDefaultMaxPerRoute(880); // 每个route最大连接数默认值,并不是越大越好,根据CPU核心数设置
                    closeableHttpAsyncClient = HttpAsyncClients.custom()
                            .setConnectionManager(manager)
                            .setDefaultRequestConfig(requestConfig)
                            .setUserAgent(userAgent)
                            .disableCookieManagement() // 关闭cookie
                            .build();
                    // 启动连接
                    closeableHttpAsyncClient.start();
                    AsyncIdleConnectionMonitorThread monitor = new HttpAsyncClientUtils().new AsyncIdleConnectionMonitorThread(manager);
                    monitor.setDaemon(true);
                    monitor.start();

                }
            } finally {
                rwl.writeLock().unlock();
            }
        }
        return closeableHttpAsyncClient;
    }

    public static void asynGet(String url, Map<String, String> map, Consumer<? super ResultVO> action, boolean flag) {
        CloseableHttpAsyncClient httpClient = getAsynClient();
        StringBuilder urlParam = new StringBuilder(url);
        StringBuilder getParam = new StringBuilder();
        if (MapUtil.isNotEmpty(map)) {
            urlParam.append(url.contains("?") ? "&" : "?");
            Set<String> keys = map.keySet();
            int i = 0;
            for (String key : keys) {
                i++;
                try {
                    getParam.append(key).append("=").append(URLEncoder.encode(map.get(key), "UTF-8"));
                    if (keys.size() != i) {
                        getParam.append("&");
                    }
                } catch (UnsupportedEncodingException e) {
                    e.printStackTrace();
                }
            }


        }

        url = urlParam.append(getParam.toString()).toString();
        HttpGet request = new HttpGet(url);
        httpExecute(action, httpClient, request, flag);
    }

    public static void asynGet(String url, Consumer<? super ResultVO> action, boolean flag) {
        asynGet(url, null, action, flag);
    }

    public static void asynPost(String url, Map<String, Object> requestJson, Consumer<? super ResultVO> action) {
        asynPost(url, requestJson, action, true);
    }

    public static void asynPost(String url, Map<String, Object> requestJson, Consumer<? super ResultVO> action, boolean flag) {
        CloseableHttpAsyncClient httpClient = getAsynClient();
        HttpPost request = new HttpPost(url);

        //装填参数
        List<NameValuePair> nvps = new ArrayList<NameValuePair>();
        try {
            if (requestJson != null) {
                for (Map.Entry<String, Object> entry : requestJson.entrySet()) {
                    if (entry.getValue() instanceof LinkedTreeMap) {
                        nvps.add(new BasicNameValuePair(entry.getKey(), JsonUtil.toJson(entry.getValue())));
                    } else {
                        nvps.add(new BasicNameValuePair(entry.getKey(), ValueUtils.isStringNull(entry.getValue())));
                    }

                }
            }
            //设置参数到请求对象中
            request.setEntity(new UrlEncodedFormEntity(nvps, "utf-8"));
        } catch (UnsupportedEncodingException e) {
            e.printStackTrace();
        }
        httpExecute(action, httpClient, request, flag);
    }


    private static void httpExecute(final Consumer<? super ResultVO> action,
                                    CloseableHttpAsyncClient httpClient, HttpUriRequest request, boolean flag) {

        httpClient.execute(request, new FutureCallback<HttpResponse>() {
            @Override
            public void completed(final HttpResponse response) {
                HttpEntity entity = response.getEntity();
                String content = null;
                Header encode = entity.getContentEncoding();
                ResultVO resultVO = new ResultVO();
                resultVO.setCode("0");
                try {
                    if (flag) {
                        InputStream in = entity.getContent();
                        try {
                            content = IOUtils.toString(in, encode == null ? "utf-8" : encode.getValue());
                            resultVO.setData(content);
                        } finally {
                            IOUtils.closeQuietly(in);
                        }

                    } else {
                        resultVO.setData(entity.getContent());
                    }

                    try {
                        resultVO.setCode(JsonUtil.jsonToMapString(content).get("code"));
                    } catch (Exception e) {

                    }

                    action.accept(resultVO);
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }

            @Override
            public void failed(final Exception ex) {
                ResultVO resultVO = new ResultVO();
                resultVO.setCode("2");
                action.accept(resultVO);
                ex.printStackTrace();
            }

            @Override
            public void cancelled() {
                ResultVO resultVO = new ResultVO();
                resultVO.setCode("3");
                action.accept(resultVO);
            }
        });
    }

    // Watches for stale connections and evicts them.
    private class AsyncIdleConnectionMonitorThread extends Thread {

        private final PoolingNHttpClientConnectionManager connMgr;

        public AsyncIdleConnectionMonitorThread(PoolingNHttpClientConnectionManager connMgr) {
            super();
            super.setDaemon(true);
            this.connMgr = connMgr;
        }

        @Override
        public void run() {
            try {
                while (!Thread.currentThread().isInterrupted()) {
                    synchronized (this) {
                        wait(5000);
                        // Close expired connections
                        connMgr.closeExpiredConnections();
                        // Optionally, close connections
                        // that have been idle longer than 30 sec
                        connMgr.closeIdleConnections(30, TimeUnit.SECONDS);
                    }
                }
            } catch (InterruptedException ex) {
                // terminate
            }
        }
    }


    public static void main(String[] args) throws Exception {
//        String[] lines = FileUtils.readLines("d://1.txt");
//        List<Map<String, String>> list = new ArrayList<>();
//        for (String line : lines) {
//            String ss = StringUtils.trimToNull(line);
//            String key = ss.split("\\(")[0];
//            String value = ss.split("\\(")[1];
//            String tcName = value.split(",")[0];
//            String tcCode = value.split(",")[1];
//            Map<String, String> map = new HashMap<String, String>();
//            map.put("key", key);
//            map.put("tcName", tcName.replaceAll("\"", ""));
//            map.put("tcCode", tcCode.replaceAll("\"", ""));
//            list.add(map);
//        }
//
//
////        String a="{\"appId\":\"1\",\"json\":{\"a\":\"adf\"},\"verify\":\"83FAB91F2A3F5419558842723AB78576\",\"cmd\":\"user@goods@time\",\"account\":"+0
////                +",\"token\":\"fd93a874-6da0-43fe-8879-cb6d7eacd650\"}";
////
//
//        Map map = new HashMap<String, String>();
//        map.put("client_id", "824d3a052f6ebcf1");
////        map.put("sign_key","66dce553441f965dabe36ba524126c9c");
//        map.put("timestamp", String.valueOf(System.currentTimeMillis()));
//        String bb = SecretCodeUtil.signSHA1(map, "sign_key", "cb0caa2306a22011073c2ec124587f1d");
//        map.put("sign", bb);
//
//        map.remove("sign_key");
//        HttpAsyncClientUtils.asynGet("https://sandbox-cop.caocaokeji.cn/v2/common/getAllCities", map, e -> {
////            for(String line:lines){
////                System.out.println(line);
////            }
//            System.out.println(e.getData());
//
//            Map<String, Object> json = JsonUtil.jsonToMap(e.getData().toString());
//            List<Map> aa = (List<Map>) ((Map) json.get("data")).get("cities");
//            aa.stream().forEach(v -> {
//                String msg = "{0},{1},{2},{3},{4},{5},{6}";
//                System.out.println(MessageFormat.format(msg, v.get("cityCode"), v.get("cityName"), v.get("luxurious"), v.get("commercial"), v.get("comfortable"), v.get("newEnergy"), v.get("taxi")));
//            });
//
//            List<Map<String, String>> result = list.stream().filter(j ->
//                    aa.stream().filter(f -> {
//                                if (f.get("cityName").equals(j.get("tcName"))) {
//                                    j.put("ccName", f.get("cityName").toString());
//                                    j.put("ccCode", f.get("cityCode").toString());
//                                    return true;
//                                }
//                                return false;
//                            }
//                    ).findFirst().isPresent()
//            ).collect(Collectors.toList());
//
////            result.stream().forEach(v -> {
////                String msg = "{0}(\"{1}\",\"{2}\",\"{3}\",\"{4}\"),";
////                System.out.println(MessageFormat.format(msg, v.get("key"), v.get("tcName"), v.get("tcCode"), v.get("ccName"), v.get("ccCode")));
////            });
//
//            aa.stream().forEach(
//                    v -> {
//
//                        if (!list.stream().filter(f -> f.get("tcName").equals(v.get("cityName"))).findFirst().isPresent()) {
//                            System.out.println(v);
//                        }
//
//                    }
//
//            );
//
//        }, true);
//        long c = System.currentTimeMillis();
        int a = 2000;
        CountDownLatch countDownLatch = new CountDownLatch(a);
//        CyclicBarrier barrier=new CyclicBarrier(4);
        StopWatch stopWatch = new StopWatch();
        stopWatch.start();
        for (int i = 0; i < a; i++) {
            int finalI = i;
//            Thread.sleep(1000);
//            new Fiber(() -> {
                HttpAsyncClientUtils.asynPost("https://www.baidu.com", null, e -> {
                    System.out.println(e.getCode());
//                System.out.println(e.getData());
                    countDownLatch.countDown();
//                System.out.println(finalI);
//                try {
////                    barrier.await();
//                    System.out.println("go");
//                } catch (InterruptedException ex) {
//                    ex.printStackTrace();
//                } catch (BrokenBarrierException ex) {
//                    ex.printStackTrace();
//                }
//                System.out.println("---"+finalI);
                });
//            }).start();
        }
        countDownLatch.await();
        stopWatch.stop();
        System.out.println(stopWatch.getTotalTimeSeconds());
    }

}
